package com.niit.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaProducerTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1.创建用于连接Kafka的配置 Properties
        Properties props = new Properties();
            //连接Kafka服务器
        props.put("bootstrap.servers","node1:9092");
            //应答模式 为 all
        props.put("acks","all");
        //开启生成者的幂等性  （避免多条重复的信息）
        props.put("enable.idempotence",true);
        //配置生产数据的 key 进行序列化
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        //配置生产数据的 value 进行序列化
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        //2.创建一个生产者对象 KafkaProducer
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);

        //3.发送数据 1-100的消息 到主题为BD2
        for (int i=1;i<=9999;i++){
            //3.1构建数据对象
            ProducerRecord<String,String> bd2 = new ProducerRecord<>("BD2_4",null,i + "");
            //3.2发送数据  alt+ enter
            Future<RecordMetadata> send = producer.send(bd2);
            //3.3调用Future的应答，通知生产者是否生产数据成功到BD2主题中
            send.get();
            System.out.println("第"+i +"条数据生产成功！！");
            Thread.sleep(5000);
        }

        //4.关闭生产
        producer.close();
    }
}
